-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🎉 Destination Bigquery: added gcs upload option #5614
🎉 Destination Bigquery: added gcs upload option #5614
Conversation
…igquery (GCS upload mode)
…ead of hardcoded and minor refactor
/test connector=connectors/destination-bigquery
|
@etsybaev While looking for a workaround with the BigQuery connector failing with large datasets, I tried first manually uploading our tables to GCS but soon hit a limitation with the current GCS implementation as described here: #5720 Since I'm assuming that you're reusing the same logic for the GCS writer I just wanted to let you know that this might also impact this PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice pr!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a major problem that I dont understand. Why are we using Amazon S3 client everywhere if we are trying to load data via GCS?
@@ -1,3 +1,13 @@ | |||
## Uploading options | |||
There are 2 available options to upload data to bigquery `Standard` and `GCS Staging`. | |||
- `Standard` is option to upload data directly from your source to BigQuery storage. This way is faster and requires less resources than GCS one. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to explain further about when to choose which option. Its not clear to me when should I choose Standard
vs GCS Uploading (CSV format)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated, thanks
throws IOException { | ||
Timestamp uploadTimestamp = new Timestamp(System.currentTimeMillis()); | ||
|
||
AmazonS3 s3Client = GcsS3Helper.getGcsS3Client(gcsDestinationConfig); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we creating AmazonS3 client if we are using Google cloud storage for loading data
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GCS is compatible with the Amazon S3 client. By reusing the S3 client, we can reuse the related code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @subodh1810. I just partially re-used our already existing destination-gcs module that had been created before I started working on this ticket. I had some conversation with @tuliren and he confirmed that S3 client was also used for destination-gcs as it mostly works with both except for some minor cases.
GcsDestinationConfig gcsDestinationConfig = GcsDestinationConfig | ||
.getGcsDestinationConfig(BigQueryUtils.getGcsJsonNodeConfig(config)); | ||
GcsCsvWriter gcsCsvWriter = initGcsWriter(gcsDestinationConfig, configStream); | ||
gcsCsvWriter.initialize(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the initialize method we are using AmazonS3 client and I am not sure I follow why is that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @subodh1810. The answer here is similar to the previous comment. I partially re-used the already existing destination-gcs module (GCS destination CSV writer in particular). But destination-gcs had been implemented re-using already existed modules from destination-S3 as far as I understood. The reason is that the Amazon S3 client in most of the cases works for both GCS and S3 storages. It's better to ask @tuliren for details.
So the idea is the next:
- Re-using the existing destination-gcs (CSV writer) we upload data to GCS bucket
- Using the native Bigquery writer we create a native job to migrate CSV formatted data from bucket to BigQuery
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you just add a java doc explaining that Amazon s3 client works with GCS as well
/test connector=connectors/destination-gcs
|
…-destination # Conflicts: # settings.gradle
/test connector=connectors/destination-bigquery
|
/test connector=connectors/destination-gcs
|
/test connector=connectors/destination-bigquery
|
/publish connector=connectors/destination-bigquery
|
* Fixed destination bigquery denormalized compilation error (caused by #5614)
Field.of(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, StandardSQLTypeName.TIMESTAMP)); | ||
// GCS works with only date\datetime formats, so need to have it a string for a while | ||
// https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-csv#data_types | ||
Field.of(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, StandardSQLTypeName.STRING), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The type of the column when creating the schema of the table was changed from timestamp to STRING
But when formatting the records to insert, they are still typed as timestamp?
Line 153 in 888e9ab
final String formattedEmittedAt = QueryParameterValue.timestamp(emittedAtMicroseconds).getValue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok so there was an issue for this here #5959
@@ -82,6 +82,10 @@ if(!System.getenv().containsKey("SUB_BUILD") || System.getenv().get("SUB_BUILD") | |||
include ':airbyte-integrations:connectors:destination-redshift' | |||
include ':airbyte-integrations:connectors:destination-snowflake' | |||
include ':airbyte-integrations:connectors:destination-oracle' | |||
|
|||
//Needed by destination-bugquery |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wish we had a destination-bigquery instead though... 😜
What
Currently, you may see some failures for big datasets and slow sources, i.e. if reading from source takes more than 10-12 hours.
This is caused by the Google BigQuery SDK client limitations. For more details please check #3549
How
There are 2 available options now to upload data to bigquery
Standard
andGCS Staging
.Standard
is option to upload data directly from your source to BigQuery storage. This way is faster and requires less resources than GCS one.GCS Uploading (CSV format)
. This is a newly introduced approach implemented in order to avoid the issue for big datasets mentioned above.At the first step all data is uploaded to GCS bucket and then all moved to BigQuery at one shot stream by stream.
The destination-gcs connector is partially used under the hood here, so you may check its documentation for more details.
Pre-merge Checklist
Expand the relevant checklist and delete the others.
New Connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
docs/SUMMARY.md
docs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampledocs/integrations/README.md
airbyte-integrations/builds.md
Airbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing./publish
command described hereUpdating a connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
docs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampleAirbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing./publish
command described hereConnector Generator
-scaffold
in their name) have been updated with the latest scaffold by running./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates
then checking in your changes